package org.lisen.rabbitmqdemo.sender;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Configuration
public class RabbitConfig {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Resource
    private RabbitTemplate rabbitTemplate;

    public static ConcurrentMap<String,Object> msgCache = new ConcurrentHashMap<>();

    /**
     * 设置rabbitTemplate
     * @return
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        logger.info("设置rabbitTemplate");

        //设置消息转换器及消息编码
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");

        //实现消息发送到exchange后接收ack回调,publisher-confirms:true
        //如果队列是可持久化的，则在消息成功持久化之后生产者收到确认消息
       rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            if(ack) {
                logger.info("消息成功发送到exchange，id:{}", correlationData.getId());
            } else {
                /*
                 * 消息未被投放到对应的消费者队列，可能的原因：
                 * 1）发送时在未找到exchange，例如exchange参数书写错误
                 * 2）消息队列已达最大长度限制（声明队列时可配置队列的最大限制），此时
                 * 返回的cause为null。
                 */
                logger.info("******************************************************");
                logger.info("11消息发送失败: {}", cause);
            }
        }));

        //消息发送失败返回队列，publisher-returns:true
        rabbitTemplate.setMandatory(true);

        //实现消息发送的exchange，但没有相应的队列于交换机绑定时的回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String id = message.getMessageProperties().getCorrelationId();
            logger.info("消息：{} 发送失败, 应答码：{} 原因：{} 交换机: {}  路由键: {}", id, replyCode, replyText, exchange, routingKey);
        });

        return rabbitTemplate;
    }


    public static final String EXCHANGE_DIRECT = "direct_exchange";

    public static final String ROUTING_DIRECT_QUE_KEY = "direct_queue_key";

    public static final String QUEUE_DIRECT = "direct_queue";

    public static final String EXCHANGE_TOPIC = "topic_exchange";

    public static final String QUEUE_TOPIC_Q1 = "topic_queue_q1";

    public static final String QUEUE_TOPIC_Q2 = "topic_queue_q2";

    public static final String ROUTING_TOPIC_QUE_KEY = "topic_queue_#";


    /**
     * 声明Direct交换机，支持持久化
     * @return
     */
    @Bean(name="directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
        //return new DirectExchange("direct_Exchange");
    }


    /**
     * 声明一个队列，支持持久化，队列参数如下：<p>
     *
     * <li> x-message-ttl：设置消息在队列中的存活时间，单位毫秒</li>
     * <li>x-max-length: 设置队列长度限制</li>
     * <li>x-max-length-bytes: 设置队列中消息的最大字节数</li>
     * <li>x-overflow: 队列消息溢出时该队列的行为，默认为drop-head：丢弃队列头部的消息， reject-publish: 拒绝接收</li>
     * <li>x-dead-letter-exchange, x-dead-letter-routing-key: 消息因为超时或超过限制在队列中消失，这种情况发生时系统
     *    会丢失一些消息，有时这些消息是我们需要获知的，rabbitmq的死信队列可以解决这个问题，设置了x-dead-letter-exchang,
     *    x-dead-letter-routing-key(两个需要同时设定)参数，那么因为超时或超过限制而被从队列中删除的消息会推到dead-exchange
     *    中，再根据routing-key推入到dead-queue中，需要时可以从dead-queue中获取这些消息。
     * </li>
     * <li>x-max-priority: 队列所支持的优先级别，例如设置为5，表示队列支持0到5个优先级别，5最高，0最低，消息生产者可以在
     *    发送消息时可以指定消息的优先级别，消息按照优先级别从高到低的顺序发送给消息消费者。
     * </li>
     * <li>alternate-exchange: 下面简称AE，当一个消息不能被route的时候，如果exchange设定了AE，则消息会被投递到AE。
     *     如果存在AE链，则会按此继续投递，直到消息被route或AE链结束或遇到已经尝试route过消息的AE
     * </li>
     *
     * @return
     */
    @Bean(name="directQueue")
    public Queue directQueue() {

        Map<String, Object> args = new HashMap<>();

        //设置消息在队列中的存活时间，单位毫秒
        args.put("x-message-ttl",1000*60*2);

        //设置队列长度限制
        args.put("x-max-length", 1);

        //设置队列中消息的最大字节数
        //args.put("x-max-length-bytes", 12);

        //队列消息溢出后该队列的行为，如果不设置默认为drop-head：丢弃队列头部的消息
        //reject-publish：拒绝接收信息
        args.put("x-overflow", "reject-publish");

        return QueueBuilder.durable(QUEUE_DIRECT)
                .withArguments(args)
                .build();
    }

    /**
     * 通过路由键，将指定的队列绑定到指定的交换机
     * @param queue  队列
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_DIRECT_QUE_KEY).noargs();
    }

    /**
     * 声明Topic交换机，支持持久化
     * @return
     */
    @Bean(name="topicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC).durable(true).build();
    }

    /**
     * 声明topic队列，支持持久化
     * @return
     */
    @Bean(name="topicQueue")
    public Queue topicQueue() {
        return QueueBuilder.durable(QUEUE_TOPIC_Q1).build();
    }

    /**
     * 声明topic队列，支持持久化
     * @return
     */
    @Bean(name="topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable(QUEUE_TOPIC_Q2).build();
    }

    /**
     * 绑定topicQueue队列和交换机
     * @param queue  队列
     * @param exchange 交换机
     * @return
     */
    @Bean
    public Binding topicBindingQ1(@Qualifier("topicQueue") Queue queue, @Qualifier("topicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_TOPIC_QUE_KEY).noargs();
    }

    /**
     * 绑定topicQueue2队列到交换机
     * @param queue 队列
     * @param exchange 交换机
     * @return
     */
    @Bean
    public Binding topicBindingQ2(@Qualifier("topicQueue2") Queue queue, @Qualifier("topicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_TOPIC_QUE_KEY).noargs();
    }

    public static final String EXCHANG_FANOUT = "fanout_exchange";

    public static final String QUEUE_FANOUT_Q1 = "fanout_queue1";

    public static final String QUEUE_FANOUT_Q2 = "fanout_queue2";

    public static final String ROUTING_FANOUT_KEY = "fanout_routing_key";


    /**
     * 声明广播交换机，支持持久化
     * @return
     */
    @Bean(name="fanoutExchange")
    public Exchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange(EXCHANG_FANOUT).durable(true).build();
    }

    /**
     * 接收广播消息队列，支持持久化
     * @return
     */
    @Bean(name="fanoutQueue1")
    public Queue fanoutQueue1() {
        return QueueBuilder.durable(QUEUE_FANOUT_Q1).build();
    }

    /**
     * 接收广播消息队列，支持持久化
     * @return
     */
    @Bean(name="fanoutQueue2")
    public Queue fanoutQueue2() {
        return QueueBuilder.durable(QUEUE_FANOUT_Q2).build();
    }

    /**
     * 将接收广播的队列1与广播交换机绑定
     * @param queue  队列
     * @param exchange 交换机
     * @return
     */
    @Bean
    public Binding fanoutBindingQ1(@Qualifier("fanoutQueue1") Queue queue, @Qualifier("fanoutExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_FANOUT_KEY).noargs();
    }

    /**
     * 将接收广播的队列2与广播交换机绑定
     * @param queue  队列
     * @param exchange 交换机
     * @return
     */
    @Bean
    public Binding fanoutBindingQ2(@Qualifier("fanoutQueue2") Queue queue, @Qualifier("fanoutExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_FANOUT_KEY).noargs();
    }

    /*
     * ------------------ 死信队列 开始----------------------
     * 以下情况下信息将会进去死信队列：
     * 1）消息被拒绝（Basic.Reject, Basic.Nack）且requeue参数被设置为false。
     * 2）消息过期（TTL）
     * 3）消息队列达到了最大长度，溢出的消息将加入死信队列
     *
     * 根据死信队列的特点，可以用来做延迟任务。
     */
    public static final String  EXCHANGE_DXL = "dxl_exchange";
    public static final String QUE_DXL = "dxl_queue";
    public static final String ROUTING_DXL_KEY = "routing_dxl_key";
    public static final String EXCHANGE_USUAL_DIRECT = "usual_direct_exchange";
    public static final String QUE_USUAL = "usual_queue";
    public static final String ROUTING_USUAL_KEY = "routing_uaual_key";


    /**
     * 死信交换机
     * @return
     */
    @Bean(name="dxlExchange")
    public Exchange dxlExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_DXL).durable(true).build();
    }


    /**
     * 死信队列
     * @return
     */
    @Bean(name="dxlQueue")
    public Queue dxlQueue() {
        return QueueBuilder.durable(QUE_DXL).build();
    }


    /**
     * 绑定死信队列和交换机
     * @return
     */
    @Bean
    public Binding bindingDXL() {
        return BindingBuilder.bind(dxlQueue()).to(dxlExchange()).with(ROUTING_DXL_KEY).noargs();
    }


    /**
     * 用于演示死信队列使用的普通交换机
     * @return
     */
    @Bean(name="usualExchange")
    public Exchange usualExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_USUAL_DIRECT).durable(true).build();
    }


    @Bean(name="usualQueue")
    public Queue usualQueue() {
        return QueueBuilder.durable(QUE_USUAL)
                .withArgument("x-message-ttl", 1000*60*2)
                .withArgument("x-max-length", 5)
                .withArgument("x-dead-letter-exchange", EXCHANGE_DXL)
                .withArgument("x-dead-letter-routing-key", ROUTING_DXL_KEY)
                .build();
    }


    @Bean
    public Binding bindingUsualQue() {
        return BindingBuilder.bind(usualQueue()).to(usualExchange()).with(ROUTING_USUAL_KEY ).noargs();
    }
    //------------------ 死信队列 结束 ----------------------

}
